---
title: How to limit concurrent task runs with tags
sidebarTitle: Limit concurrent task runs with tags
description: Prevent too many tasks from running simultaneously using tags.
keywords: [concurrency, concurrency limits, task tags, rate limiting, parallel execution, task orchestration, resource management, throttling]
---

Use task tags to limit how many tasks can run at the same time. This is useful when tasks interact with shared resources like databases or external APIs.

```python
from prefect import flow, task

@task(tags=["database"])
def fetch_data():
    # This task will respect the "database" tag's concurrency limit
    return "data"

@flow
def my_workflow():
    fetch_data()
```

For more details on how tag-based concurrency limits work, see the [tag-based concurrency limits concept page](/v3/concepts/tag-based-concurrency-limits).

<Note>
As of Prefect 3.4.19, tag-based concurrency limits are backed by [global concurrency limits](/v3/concepts/global-concurrency-limits). When you create a tag-based limit, you'll see a corresponding global concurrency limit with the name `tag:{tag_name}`.
</Note>

## Set concurrency limits on tags

Tags without concurrency limits allow unlimited concurrent runs. Once you set a limit, only that many tasks with the tag can run simultaneously.

```python
from prefect import flow, task

@task(tags=["database"])
def query_database(query: str):
    # Simulate database work
    return f"Results for {query}"

@flow
def data_pipeline():
    # These will respect the "database" tag limit
    query_database("SELECT * FROM users")
    query_database("SELECT * FROM orders")
    query_database("SELECT * FROM products")
```

## Apply multiple tags to a task

When a task has multiple tags, it must have available concurrency slots for **all** tags to run.

```python
from prefect import task

@task(tags=["database", "analytics"])
def complex_query():
    # This task needs available slots in both "database" AND "analytics" limits
    return "complex results"
```

## Configure concurrency limits

You can configure limits using the CLI, Python client, API, Terraform, or the Prefect UI.

### CLI

```bash
# Set a limit of 10 for the "database" tag
prefect concurrency-limit create database 10

# View all concurrency limits
prefect concurrency-limit ls

# View details about a specific tag's limit
prefect concurrency-limit inspect database

# Delete a concurrency limit
prefect concurrency-limit delete database
```

### Python client

```python
from prefect import get_client

async with get_client() as client:
    # Set a concurrency limit of 10 on the "database" tag
    await client.create_concurrency_limit(
        tag="database",
        concurrency_limit=10
    )

    # Read current limit for a tag
    limit = await client.read_concurrency_limit_by_tag(tag="database")

    # Delete a concurrency limit
    await client.delete_concurrency_limit_by_tag(tag="database")

    # View all concurrency limits
    limits = await client.read_concurrency_limits()
```

### API

```bash
# Create a concurrency limit
curl -X POST "http://localhost:4200/api/concurrency_limits/" \
  -H "Content-Type: application/json" \
  -d '{"tag": "database", "concurrency_limit": 10}'

# Get all concurrency limits
curl "http://localhost:4200/api/concurrency_limits/"
```

### Terraform

```hcl
resource "prefect_concurrency_limit" "database_limit" {
  tag               = "database"
  concurrency_limit = 10
}
```

## Configure globally

When a task is delayed due to a concurrency limit, it will wait for a set wait time before retrying. You can set the wait time for when tasks are delayed due to concurrency limits:

```bash
prefect config set PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS=60
```

Note: this setting needs to be set on the Prefect server, not the Prefect client.